feat: add peerinfo#103
Conversation
|
Coverage (base → head): |
emlautarom1
left a comment
There was a problem hiding this comment.
For this PR in general is a matter of trust: I cannot really validate that the implementation is correct and I would not be able to debug if there were issues. Also, the structure is completely different to the Go implementation.
We can go with this approach but I wonder if relying on libp2p_stream wouldn't be easier to debug and result in less code to maintain. I would like to have a second opinion about it (cc. @iamquang95).
| futures = "0.3" | ||
| futures-timer = "3.0" |
There was a problem hiding this comment.
Dependency versions should be listed in the top level Cargo.toml file only.
There was a problem hiding this comment.
my bad, forgot to move
| // Encode message to protobuf bytes | ||
| let mut buf = Vec::with_capacity(msg.encoded_len()); | ||
| msg.encode(&mut buf) | ||
| .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; | ||
|
|
||
| // Write unsigned varint length prefix | ||
| let mut len_buf = unsigned_varint::encode::usize_buffer(); | ||
| let encoded_len = unsigned_varint::encode::usize(buf.len(), &mut len_buf); | ||
| stream.write_all(encoded_len).await?; | ||
|
|
||
| // Write protobuf bytes | ||
| stream.write_all(&buf).await?; | ||
| stream.flush().await |
There was a problem hiding this comment.
I haven't tried it yet but I think following is equivalent:
| // Encode message to protobuf bytes | |
| let mut buf = Vec::with_capacity(msg.encoded_len()); | |
| msg.encode(&mut buf) | |
| .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; | |
| // Write unsigned varint length prefix | |
| let mut len_buf = unsigned_varint::encode::usize_buffer(); | |
| let encoded_len = unsigned_varint::encode::usize(buf.len(), &mut len_buf); | |
| stream.write_all(encoded_len).await?; | |
| // Write protobuf bytes | |
| stream.write_all(&buf).await?; | |
| stream.flush().await | |
| let mut bb = bytes::BytesMut::with_capacity(MAX_MESSAGE_SIZE); | |
| request.encode_length_delimited(&mut bb); | |
| stream.write_all(&bb).await?; |
Maybe we could add a test for how the encoded data should look like?
There was a problem hiding this comment.
I think it's not right way to do, since stream could have mulltiple requests and when you read it to end you will skip them
There was a problem hiding this comment.
This is for writing so it should not be problematic. As for reading I agree, the stream might contain multiple encoded messages. The prost library contains some helper functions to read the lengths; we might be able to drop the unsigned-varint altogether.
| // Read unsigned varint length prefix | ||
| let msg_len = read_usize(&mut *stream).await.map_err(|e| match e { | ||
| unsigned_varint::io::ReadError::Io(io_err) => io_err, | ||
| other => io::Error::new(io::ErrorKind::InvalidData, other), | ||
| })?; | ||
|
|
||
| if msg_len > MAX_MESSAGE_SIZE { | ||
| return Err(io::Error::new( | ||
| io::ErrorKind::InvalidData, | ||
| format!("message too large: {msg_len} bytes (max: {MAX_MESSAGE_SIZE})"), | ||
| )); | ||
| } | ||
|
|
||
| // Read exactly `msg_len` protobuf bytes | ||
| let mut buf = vec![0u8; msg_len]; | ||
| stream.read_exact(&mut buf).await?; | ||
|
|
||
| // Unmarshal protobuf | ||
| M::decode(&buf[..]).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) |
There was a problem hiding this comment.
Same here:
| // Read unsigned varint length prefix | |
| let msg_len = read_usize(&mut *stream).await.map_err(|e| match e { | |
| unsigned_varint::io::ReadError::Io(io_err) => io_err, | |
| other => io::Error::new(io::ErrorKind::InvalidData, other), | |
| })?; | |
| if msg_len > MAX_MESSAGE_SIZE { | |
| return Err(io::Error::new( | |
| io::ErrorKind::InvalidData, | |
| format!("message too large: {msg_len} bytes (max: {MAX_MESSAGE_SIZE})"), | |
| )); | |
| } | |
| // Read exactly `msg_len` protobuf bytes | |
| let mut buf = vec![0u8; msg_len]; | |
| stream.read_exact(&mut buf).await?; | |
| // Unmarshal protobuf | |
| M::decode(&buf[..]).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) | |
| // Read unsigned varint length prefix | |
| let mut buffer = Vec::with_capacity(MAX_MESSAGE_SIZE); | |
| stream.read_to_end(&mut buffer).await?; | |
| M::decode_length_delimited(buffer.as_slice()) |
| pub async fn send_peer_info( | ||
| mut stream: Stream, | ||
| request: &PeerInfo, | ||
| ) -> io::Result<(Stream, PeerInfo)> { |
There was a problem hiding this comment.
Could we use the following signature?
| pub async fn send_peer_info( | |
| mut stream: Stream, | |
| request: &PeerInfo, | |
| ) -> io::Result<(Stream, PeerInfo)> { | |
| pub async fn send_peer_info(stream: &mut Stream, request: &PeerInfo) -> io::Result<PeerInfo> { |
There was a problem hiding this comment.
No, since it will run as a future which should own a stream, otherwise it could lead to lifetime issues
| const DEFAULT_INTERVAL: Duration = Duration::from_secs(60); | ||
|
|
||
| /// Default timeout for peer info requests. | ||
| const DEFAULT_TIMEOUT: Duration = Duration::from_secs(20); |
There was a problem hiding this comment.
From where are we getting these constants?
There was a problem hiding this comment.
For interval: https://github.com/ObolNetwork/charon/blob/54ec4d915806130b824e5f8497cf074cdfbcedfa/app/peerinfo/peerinfo.go#L29
Timeout was set by me
| } | ||
| } | ||
|
|
||
| impl ConnectionHandler for Handler { |
There was a problem hiding this comment.
This is my biggest gripe with the approach: I have no way to review this code or assert that it's correct 😢
There was a problem hiding this comment.
We can setup a meeting and I will make a walkthrough the project if you want, but it's the only way to go. Also, it's the easiest module for us to implement among all others, so the sooner we start using native libp2p approach - the better. This approach provides the best code maintainability and resilience
emlautarom1
left a comment
There was a problem hiding this comment.
After some internal discussion we'll move forward with this PR. A few things to keep in mind:
- We need to eventually test edge case conditions like connections dropping mid message, peers disconecting and spamming peerinfo (potential unlimited vector growth)
- Add references to the
pingwhich was used as a base for this implementation' - Include some tests that validate peerinfo serialization/deserialization (ex. write multiple consecutive peerinfos to a stream and deserialize them)
Uses libp2p::ping as a reference https://github.com/libp2p/rust-libp2p/blob/4ee0e07dc01b5190026425d7e28872fefe0e6f3c/protocols/ping/src/handler.rs